package com.atguigu.flink.cep;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;

/**
 * Created by Smexy on 2023/2/3
 *      迟到： 当前数据的 eventtime < 当前水印
 */
public class Demo8_Late
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
            .<WaterSensor>forMonotonousTimestamps()
            .withTimestampAssigner((e, r) -> e.getTs());


        SingleOutputStreamOperator<WaterSensor> ds =
            env.socketTextStream("hadoop103",8888)
               .map(new WaterSensorMapFunction())
               .assignTimestampsAndWatermarks(watermarkStrategy);


        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("规则1")
            .where(new SimpleCondition<WaterSensor>()
            {
                //定义匹配规则
                @Override
                public boolean filter(WaterSensor value) throws Exception {
                    return "s1".equals(value.getId());
                }
            });

        OutputTag<WaterSensor> outputTag = new OutputTag<>("late", TypeInformation.of(WaterSensor.class));

        PatternStream<WaterSensor> patternStream = CEP.pattern(ds, pattern)
                                                      //使用侧流接收迟到的数据
                                                                 .sideOutputLateData(outputTag);


        SingleOutputStreamOperator<String> ds1 = patternStream.select(new PatternSelectFunction<WaterSensor, String>()
        {

            @Override
            public String select(Map<String, List<WaterSensor>> map) throws Exception {
                return map.toString();
            }
        });

        //主流
        ds1.print().setParallelism(1);

        //从主流中获取侧流
        ds1.getSideOutput(outputTag).print().setParallelism(1);




        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}
